/*
 * Copyright (c) Kumo Inc. and affiliates.
 * Copyright (c) Meta Platforms, Inc. and affiliates.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <melon/observer/detail/observer_manager.h>

#include <future>

#include <melon/exception_string.h>
#include <melon/format.h>
#include <melon/range.h>
#include <melon/singleton.h>
#include <melon/concurrency/unbounded_queue.h>
#include <melon/config.h>
#include <melon/system/thread_name.h>

namespace melon {
    namespace observer_detail {
        thread_local bool ObserverManager::inManagerThread_{false};
        thread_local ObserverManager::DependencyRecorder::Dependencies *
                ObserverManager::DependencyRecorder::currentDependencies_{nullptr};

        namespace {
            constexpr StringPiece kObserverManagerThreadNamePrefix{"ObserverMngr"};
            constexpr size_t kNextBatchSize{1024};

            auto &currentQueue() {
                static melon::Indestructible<UMPMCQueue<Function<void()>, true> > instance;
                return *instance;
            }

            auto &nextQueue() {
                static melon::Indestructible<UMPSCQueue<Function<Core::Ptr()>, true> >
                        instance;
                return *instance;
            }
        } // namespace

        class ObserverManager::UpdatesManager::CurrentQueueProcessor {
        public:
            CurrentQueueProcessor() {
                if (turbo::get_flag(FLAGS_observer_manager_pool_size) < 1) {
                    KLOG(ERROR) << "--observer_manager_pool_size should be >= 1";
                    turbo::set_flag(&FLAGS_observer_manager_pool_size, 1);
                }
                for (int32_t i = 0; i < turbo::get_flag(FLAGS_observer_manager_pool_size); ++i) {
                    threads_.emplace_back([this, i]() {
                        melon::setThreadName(
                            melon::sformat("{}{}", kObserverManagerThreadNamePrefix, i));
                        ObserverManager::inManagerThread_ = true;

                        while (true) {
                            Function<void()> task;
                            queue_.dequeue(task);

                            if (!task) {
                                return;
                            }

                            try {
                                task();
                            } catch (...) {
                                KLOG(ERROR) << "Exception while running CurrentQueue task: "
                       << exceptionStr(current_exception());
                            }
                        }
                    });
                }
            }

            ~CurrentQueueProcessor() {
                for (size_t i = 0; i < threads_.size(); ++i) {
                    queue_.enqueue(nullptr);
                }

                for (auto &thread: threads_) {
                    thread.join();
                }
            }

        private:
            UMPMCQueue<Function<void()>, true> &queue_{currentQueue()};
            std::vector<std::thread> threads_;
        };

        class ObserverManager::UpdatesManager::NextQueueProcessor {
        public:
            NextQueueProcessor() {
                thread_ = std::thread([&]() {
                    auto &manager = getInstance();

                    melon::setThreadName(
                        melon::sformat("{}NQ", kObserverManagerThreadNamePrefix));

                    Function<Core::Ptr()> queueCoreFunc;

                    while (!stop_) {
                        std::vector<Core::Ptr> cores;
                        queue_.dequeue(queueCoreFunc);
                        do {
                            {
                                if (auto queueCore = queueCoreFunc()) {
                                    cores.emplace_back(std::move(queueCore));
                                }
                            }
                        } while (!stop_ && cores.size() < kNextBatchSize &&
                                 queue_.try_dequeue(queueCoreFunc)); {
                            std::unique_lock wh(manager.versionMutex_);

                            // We can't pick more tasks from the queue after we bumped the
                            // version, so we have to do this while holding the lock.

                            for (auto &corePtr: cores) {
                                corePtr->setForceRefresh();
                            }

                            ++manager.version_;
                        }

                        for (auto &core: cores) {
                            manager.scheduleRefresh(std::move(core), manager.version_);
                        } {
                            auto wEmptyWaiters = emptyWaiters_.wlock();
                            // We don't want any new waiters to be added while we are checking the
                            // queue.
                            if (queue_.empty()) {
                                for (auto &promise: *wEmptyWaiters) {
                                    promise.set_value();
                                }
                                wEmptyWaiters->clear();
                            }
                        }
                    }
                });
            }

            ~NextQueueProcessor() {
                stop_ = true;
                // Write to the queue to notify the thread.
                queue_.enqueue([]() -> Core::Ptr { return nullptr; });
                thread_.join();
            }

            void waitForEmpty() {
                std::promise<void> promise;
                auto future = promise.get_future();
                emptyWaiters_.wlock()->push_back(std::move(promise));

                // Write to the queue to notify the thread.
                queue_.enqueue([]() -> Core::Ptr { return nullptr; });

                future.get();
            }

        private:
            UMPSCQueue<Function<Core::Ptr()>, true> &queue_{nextQueue()};
            std::thread thread_;
            std::atomic<bool> stop_{false};
            melon::Synchronized<std::vector<std::promise<void> > > emptyWaiters_;
        };

        ObserverManager::UpdatesManager::UpdatesManager() {
            currentQueueProcessor_ = std::make_unique<CurrentQueueProcessor>();
            nextQueueProcessor_ = std::make_unique<NextQueueProcessor>();
        }

        void ObserverManager::scheduleCurrent(Function<void()> task) {
            currentQueue().enqueue(std::move(task));
            getUpdatesManager();
        }

        void ObserverManager::scheduleNext(Function<Core::Ptr()> coreFunc) {
            nextQueue().enqueue(std::move(coreFunc));
            getUpdatesManager();
        }

        bool ObserverManager::tryWaitForAllUpdatesImpl(TryWaitForAllUpdatesImplOp op) {
            if (auto updatesManager = getUpdatesManager()) {
                return updatesManager->tryWaitForAllUpdatesImpl(op);
            }
            return true;
        }

        bool ObserverManager::UpdatesManager::tryWaitForAllUpdatesImpl(
            TryWaitForAllUpdatesImplOp op) {
            auto &instance = ObserverManager::getInstance();
            nextQueueProcessor_->waitForEmpty();
            // Wait for all readers to release the lock.
            return op(instance.versionMutex_).owns_lock();
        }

        struct ObserverManager::Singleton {
            static melon::Singleton<UpdatesManager> instance;
            // MSVC 2015 doesn't let us access ObserverManager's constructor if we
            // try to use a lambda to initialize instance, so we have to create
            // an actual function instead.
            static UpdatesManager *createManager() { return new UpdatesManager(); }
        };

        melon::Singleton<ObserverManager::UpdatesManager>
        ObserverManager::Singleton::instance =
                melon::Singleton<ObserverManager::UpdatesManager>(createManager)
                .shouldEagerInitOnReenable();

        std::shared_ptr<ObserverManager::UpdatesManager>
        ObserverManager::getUpdatesManager() {
            return Singleton::instance.try_get();
        }

        ObserverManager &ObserverManager::getInstance() {
            static auto instance = new ObserverManager();
            return *instance;
        }
    } // namespace observer_detail
} // namespace melon
